package com.paul.mq.dependencies.mqtt.aliyun;

import com.paul.mq.config.AliyunMQTTConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * Created by zmk523@163.com on 2019/8/20 15:08
 */

@Component
public class MQTTConfig {

    private static final Logger log = LoggerFactory.getLogger(MQTTConfig.class);

    @Autowired
    private AliyunMQTTConfig aliyunConfig;
    @Autowired
    private ConsumerMsgManager consumerMessageManager;

    MqttClient mqttClient;


    public void initMQTTClient() throws Exception {
        /**
         * MQ4IOT支持子级 topic，用来做自定义的过滤，此处为示意，可以填写任何字符串，具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是，完整的 topic 长度不得超过128个字符。
         */
        final String mq4IotTopic = aliyunConfig.getAliyunMQTTTopic() + "/" + "testMq4Iot";
        /**
         * MQ4IOT clientId，由业务系统分配，需要保证每个 tcp 连接都不一样，保证全局唯一，如果不同的客户端对象（tcp 连接）使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成，格式为 GroupID@@@DeviceId，其中 groupId 在 MQ4IOT 控制台申请，DeviceId 由业务方自己设置，clientId 总长度不得超过64个字符。
         */
//        String clientId = groupId + "LKS9AB31D7CE2D3101010" + mqttSend;
        String clientId = aliyunConfig.getAliyunMQTTGroupId() + "@@@" + aliyunConfig.getAliyunMQTTServiceTag();// + config.getAliyunMQTTSend();
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(aliyunConfig.getAliyunMQTTInstanceId(), aliyunConfig.getAliyunAccessKey(), aliyunConfig.getAliyunSecretKey(), clientId, aliyunConfig.getAliyunMQTTCleanSession());
//mq

        if (mqttClient == null || !mqttClient.isConnected()) {
            /**
             * 客户端使用的协议和端口必须匹配，具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
             * 如果是 SSL 加密则设置ssl://endpoint:8883
             */
            mqttClient = new MqttClient(aliyunConfig.getAliyunMQTTEndPoint(), clientId, memoryPersistence);
            /**
             * 客户端设置好发送超时时间，防止无限阻塞
             */
            mqttClient.setTimeToWait(5000);
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());

        }

//        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                log.info("connect success");

//                executorService.submit(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("客户端连接成功后就需要尽快订阅需要的 topic");
//                        try {
//                            final String topicFilter[] = {mq4IotTopic};
//                            final int[] qos = {config.getAliyunMQTTQosLevel()};
//                            mqttClient.subscribe(topicFilter, qos);
//                        } catch (MqttException e) {
//                            e.printStackTrace();
//                        }
//                    }
//                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
                log.info("connect connectionLost");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                String result = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
                log.info("receive msg from topic " + s + " , body is " + result);
                try {
                    consumerMessageManager.doOnMessage(result);
                } catch (Exception e) {
                    log.info("messageArrived :" + e.getMessage());
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                log.info("send msg succeed：" + iMqttDeliveryToken.getMessageId());

            }
        });


//        MqttConnectOptions connOpts = new MqttConnectOptions();
//        String sign = Tools.macSignature(clientId.split("@@@")[0], secretKey);
//        connOpts.setUserName("Signature|" + accessKey + "|post-cn-0pp1cbzb305");
//        connOpts.setPassword(sign.toCharArray());
//        connOpts.setCleanSession(cleanSession);
//        connOpts.setKeepAliveInterval(10);
//        connOpts.setConnectionTimeout(10);
//        connOpts.setAutomaticReconnect(true);
//        connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
//        if (!mqttClient.isConnected()) {
//            mqttClient.connect();
//        } else {
//            mqttClient.disconnect();
//            mqttClient.connect();
//        }


//        for (int i = 0; i < 1; i++) {
//            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
//            message.setQos(config.getAliyunMQTTQosLevel());
//
//            /**
//             *  发送普通消息时，topic 必须和接收方订阅的 topic 一致，或者符合通配符匹配规则
//             */
////            mqttClient.publish(mq4IotTopic, message);
//            /**
//             * MQ4IoT支持点对点消息，即如果发送方明确知道该消息只需要给特定的一个设备接收，且知道对端的 clientId，则可以直接发送点对点消息。
//             * 点对点消息不需要经过订阅关系匹配，可以简化订阅方的逻辑。点对点消息的 topic 格式规范是  {{parentTopic}}/p2p/{{targetClientId}}
//             */
//            final String p2pSendTopic = config.getAliyunTopic() + "/p2p/" + clientId;
//            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
//            message.setQos(config.getAliyunMQTTQosLevel());
//            mqttClient.publish(p2pSendTopic, message);
//        }
    }

    @Bean("mqttClient")
    public MqttClient clientStart() {
        retry(1000);
        return mqttClient;
    }

    private synchronized void retry(int number) {
        for (int i = 0; i < number || number > 999; i++) {
            try {
                Thread.sleep(5000);
                initMQTTClient();
            } catch (Exception e) {
                log.error("mqtt服务初始化连接失败 ，正在尝试第 " + i + "次重试");
                continue;
            }
            return;
        }
    }
}
